netty 服务器端和客户端(超时机制)

您所在的位置:网站首页 netty 服务端和客户端 netty 服务器端和客户端(超时机制)

netty 服务器端和客户端(超时机制)

2024-06-06 07:48| 来源: 网络整理| 查看: 265

一、概念介绍网络中的接收和发送数据都是使用操作系统中的SOCKET进行实现。但是如果此套接字已经断开,那发送数据和接收数据的时候就一定会有问题。可是如何判断这个套接字是否还可以使用呢?这个就需要在系统中创建心跳机制。其实TCP中已经为我们实现了一个叫做心跳的机制。如果你设置了心跳,那TCP就会在一定的时间(比如你设置的是3秒钟)内发送你设置的次数的心跳(比如说2次),并且此信息不会影响你自己定义的协议。所谓“心跳”就是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着。 以确保链接的有效性。

所谓的心跳包就是客户端定时发送简单的信息给服务器端告诉它我还在而已。代码就是每隔几分钟发送一个固定信息给服务端,服务端收到后回复一个固定信息如果服务端几分钟内没有收到客户端信息则视客户端断开。比如有些通信软件长时间不使用,要想知道它的状态是在线还是离线就需要心跳包,定时发包收包。发包方:可以是客户也可以是服务端,看哪边实现方便合理。一般是客户端。服务器也可以定时轮询发心跳下去。心跳包之所以叫心跳包是因为:它像心跳一样每隔固定时间发一次,以此来告诉服务器,这个客户端还活着。事实上这是为了保持长连接,至于这个包的内容,是没有什么特别规定的,不过一般都是很小的包,或者只包含包头的一个空包。

在TCP的机制里面,本身是存在有心跳包的机制的,也就是TCP的选项。系统默认是设置的是2小时的心跳频率。但是它检查不到机器断电、网线拔出、防火墙这些断线。而且逻辑层处理断线可能也不是那么好处理。一般,如果只是用于保活还是可以的。心跳包一般来说都是在逻辑层发送空的包来实现的。下一个定时器,在一定时间间隔下发送一个空包给客户端,然后客户端反馈一个同样的空包回来,服务器如果在一定时间内收不到客户端发送过来的反馈包,那就只有认定说掉线了。只需要send或者recv一下,如果结果为零,则为掉线。

但是,在长连接下,有可能很长一段时间都没有数据往来。理论上说,这个连接是一直保持连接的,但是实际情况中,如果中间节点出现什么故障是难以知道的。更要命的是,有的节点(防火墙)会自动把一定时间之内没有数据交互的连接给断掉。在这个时候,就需要我们的心跳包了,用于维持长连接,保活。在获知了断线之后,服务器逻辑可能需要做一些事情,比如断线后的数据清理呀,重新连接呀当然,这个自然是要由逻辑层根据需求去做了。总的来说,心跳包主要也就是用于长连接的保活和断线处理。一般的应用下,判定时间在30-40秒比较不错。如果实在要求高,那就在6-9秒。

二、心跳实现使用TCP协议层的Keeplive机制,但是该机制默认的心跳时间是2小时,依赖操作系统实现不够灵活;

心跳机制一般来说都是在逻辑层发送空的包来实现的,比如Netty的IdleStateHandler类实现心跳机制。

心跳机制实现逻辑:每隔几分钟发送一个固定信息给服务端,服务端收到后回复一个固定信息给客户端,如果服务端几分钟内没有收到客户端信息则视客户端断开。

在Netty中IdleStateHandler主要用来检测远端是否存活,如果不存活或活跃则对空闲Socket连接进行处理避免资源的浪费;IdleStateHandler实现对三种心跳的检测,分别是readerIdleTime、writerIdleTime和allIdleTime,参数解释如下: 1)readerIdleTime:读超时时间2)writerIdleTime:写超时时间3)allIdleTime:所有类型的超时时间

所以在channelPipeline中加入IdleStateHandler,我们在handler中提示的是5秒读,所以我们服务端的配置的是:

ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));

因为服务端必须5秒接受一次心跳请求,那么客户端的配置:

ph.addLast( new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

userEventTriggered是Netty 处理心跳超时事件,在IdleStateHandler设置超时时间,如果达到了,就会直接调用该方法。如果没有超时则不调用。我们重写该方法的话,就可以自行进行相关的业务逻辑处理了。

三、IdleStateHandler心跳检测实例a、服务端HeartNettyServer——服务端启动类

复制代码 package com.dxfx.netty.demo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 服务端启动类 * * @author Administrator * */ public class HeartNettyServer { public static void main(String[] args) throws InterruptedException { // 首先,netty通过ServerBootstrap启动服务端 ServerBootstrap server = new ServerBootstrap(); EventLoopGroup parentGroup = new NioEventLoopGroup(); EventLoopGroup childGroup =new NioEventLoopGroup(); //第1步定义两个线程组,用来处理客户端通道的accept和读写事件 //parentGroup用来处理accept事件,childgroup用来处理通道的读写事件 //parentGroup获取客户端连接,连接接收到之后再将连接转发给childgroup去处理 server.group(parentGroup, childGroup); //用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。 //用来初始化服务端可连接队列 //服务端处理客户端连接请求是按顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。 server.option(ChannelOption.SO_BACKLOG, 128); //第2步绑定服务端通道 server.channel(NioServerSocketChannel.class); //第3步绑定handler,处理读写事件,ChannelInitializer是给通道初始化 server.childHandler(new HeartNettyServerFilter()); //第4步绑定8080端口 ChannelFuture future = server.bind(8080).sync(); //当通道关闭了,就继续往下走 future.channel().closeFuture().sync(); } } 复制代码

HeartNettyServerFilter——服务端过滤器,如编解码和心跳的设置

复制代码 package com.dxfx.netty.demo; import java.util.concurrent.TimeUnit; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; /** * 服务端过滤器,如编解码和心跳的设置 * * @author Administrator * */ public class HeartNettyServerFilter extends ChannelInitializer { @Override protected void initChannel(SocketChannel sc) throws Exception { ChannelPipeline cp = sc.pipeline(); cp.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); // 解码和编码,应和客户端一致 cp.addLast(new StringDecoder()); cp.addLast(new StringEncoder()); //处理服务端的业务逻辑 cp.addLast(new HeartNettyServerHandler()); } } 复制代码

HeartNettyServerHandler——处理服务端业务逻辑:心跳超时处理、客服端返回的数据处理

复制代码 package com.dxfx.netty.demo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** * 处理服务端业务逻辑:心跳超时处理、客服端返回的数据处理 * * @author Administrator * */ public class HeartNettyServerHandler extends ChannelInboundHandlerAdapter { /** 空闲次数 */ private int idle_count = 1; /** 发送次数 */ private int count = 1; /** * 超时处理,如果5秒没有收到客户端的心跳,就触发; 如果超过两次,则直接关闭; */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { if (obj instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) obj; if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令 if (idle_count > 2) { System.out.println("超过两次无客户端请求,关闭该channel"); ctx.channel().close(); } System.out.println("已等待5秒还没收到客户端发来的消息"); idle_count++; } } else { super.userEventTriggered(ctx, obj); } } /** * 业务逻辑处理 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("第" + count + "次" + ",服务端收到的消息:" + msg); String message = (String) msg; // 如果是心跳命令,服务端收到命令后回复一个相同的命令给客户端 if ("hb_request".equals(message)) { ctx.write("服务端成功收到心跳信息"); ctx.flush(); } count++; } /** * 异常处理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 复制代码

b、客户端HeartNettyClient——客户端启动类

复制代码 package com.dxfx.netty.demo; import java.io.IOException; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; /** * 客户端启动类 * * @author Administrator * */ public class HeartNettyClient { public static void main(String[] args) throws InterruptedException, IOException { // 首先,netty通过Bootstrap启动客户端 Bootstrap client = new Bootstrap(); // 第1步 定义线程组,处理读写和链接事件,没有了accept事件 EventLoopGroup group = new NioEventLoopGroup(); client.group(group); // 第2步 绑定客户端通道 client.channel(NioSocketChannel.class); // 第3步 给NIoSocketChannel初始化handler, 处理读写事件 client.handler(new HeartNettyClientFilter()); // 连接服务端 Channel future = client.connect("localhost", 8080).sync().channel(); //给服务端发送数据 String str = "Hello Netty"; future.writeAndFlush(str); System.out.println("客户端发送数据:" + str); } } 复制代码

HeartNettyClientFilter——客户端过滤器,如编解码和心跳的设置

复制代码 package com.dxfx.netty.demo; import java.util.concurrent.TimeUnit; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; /** * 客户端过滤器,如编解码和心跳的设置 * * @author Administrator * */ public class HeartNettyClientFilter extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline ph = ch.pipeline(); //因为服务端设置的超时时间是5秒,所以客户端设置4秒 ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS)); // 解码和编码,应和服务端一致 ph.addLast(new StringDecoder()); ph.addLast(new StringEncoder()); //处理客户端的业务逻辑 ph.addLast(new HeartNettyClientHandler()); } } 复制代码

 

HeartNettyClientHandler——处理客户端业务逻辑:心跳超时处理、服务端返回的数据处理

复制代码 package com.dxfx.netty.demo; import java.text.SimpleDateFormat; import java.util.Date; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; /** * 处理客户端业务逻辑:心跳超时处理、服务端返回的数据处理 * * @author Administrator * */ public class HeartNettyClientHandler extends ChannelInboundHandlerAdapter { /** 客户端请求的心跳命令 */ private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("hb_request", CharsetUtil.UTF_8)); /** 空闲次数 */ private int idle_count = 1; /** 发送次数 */ private int count = 1; /** 循环次数 */ private int fcount = 1; /** * 建立连接时 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("建立连接时:" + date()); ctx.fireChannelActive(); } /** * 关闭连接时 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("关闭连接时:" + date()); } /** * 心跳请求处理,每4秒发送一次心跳请求; * */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { System.out.println("\r\n循环请求的时间:" + date() + ",次数" + fcount); if (obj instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) obj; if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态就发送心跳命令 // 设置发送次数,允许发送3次心跳包 if (idle_count


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3